{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-use-databricks-as-compute-target.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Using Databricks as a Compute Target from Azure Machine Learning Pipeline\n",
        "To use Databricks as a compute target from [Azure Machine Learning Pipeline](https://aka.ms/pl-concept), a [DatabricksStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py) is used. This notebook demonstrates the use of DatabricksStep in Azure Machine Learning Pipeline.\n",
        "\n",
        "The notebook will show:\n",
        "1. Running an arbitrary Databricks notebook that the customer has in Databricks workspace\n",
        "2. Running an arbitrary Python script that the customer has in DBFS\n",
        "3. Running an arbitrary Python script that is available on local computer (will upload to DBFS, and then run in Databricks) \n",
        "4. Running a JAR job that the customer has in DBFS.\n",
        "\n",
        "## Before you begin:\n",
        "\n",
        "1. **Create an Azure Databricks workspace** in the same subscription where you have your Azure Machine Learning workspace. You will need details of this workspace later on to define DatabricksStep. [Click here](https://ms.portal.azure.com/#blade/HubsExtension/Resources/resourceType/Microsoft.Databricks%2Fworkspaces) for more information.\n",
        "2. **Create PAT (access token)**: Manually create a Databricks access token at the Azure Databricks portal. See [this](https://docs.databricks.com/api/latest/authentication.html#generate-a-token) for more information.\n",
        "3. **Add demo notebook to ADB**: This notebook has a sample you can use as is. Launch Azure Databricks attached to your Azure Machine Learning workspace and add a new notebook. \n",
        "4. **Create/attach a Blob storage** for use from ADB"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Add demo notebook to ADB Workspace\n",
        "Copy and paste the below code to create a new notebook in your ADB workspace."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "```python\n",
        "# direct access\n",
        "dbutils.widgets.get(\"myparam\")\n",
        "p = getArgument(\"myparam\")\n",
        "print (\"Param -\\'myparam':\")\n",
        "print (p)\n",
        "\n",
        "dbutils.widgets.get(\"input\")\n",
        "i = getArgument(\"input\")\n",
        "print (\"Param -\\'input':\")\n",
        "print (i)\n",
        "\n",
        "dbutils.widgets.get(\"output\")\n",
        "o = getArgument(\"output\")\n",
        "print (\"Param -\\'output':\")\n",
        "print (o)\n",
        "\n",
        "n = i + \"/testdata.txt\"\n",
        "df = spark.read.csv(n)\n",
        "\n",
        "display (df)\n",
        "\n",
        "data = [('value1', 'value2')]\n",
        "df2 = spark.createDataFrame(data)\n",
        "\n",
        "z = o + \"/output.txt\"\n",
        "df2.write.csv(z)\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Azure Machine Learning and Pipeline SDK-specific imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "import azureml.core\n",
        "from azureml.core.runconfig import JarLibrary\n",
        "from azureml.core.compute import ComputeTarget, DatabricksCompute\n",
        "from azureml.exceptions import ComputeTargetException\n",
        "from azureml.core import Workspace, Experiment\n",
        "from azureml.pipeline.core import Pipeline, PipelineData\n",
        "from azureml.pipeline.steps import DatabricksStep\n",
        "from azureml.core.datastore import Datastore\n",
        "from azureml.data.data_reference import DataReference\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Attach Databricks compute target\n",
        "Next, you need to add your Databricks workspace to Azure Machine Learning as a compute target and give it a name. You will use this name to refer to your Databricks workspace compute target inside Azure Machine Learning.\n",
        "\n",
        "- **Resource Group** - The resource group name of your Azure Machine Learning workspace\n",
        "- **Databricks Workspace Name** - The workspace name of your Azure Databricks workspace\n",
        "- **Databricks Access Token** - The access token you created in ADB\n",
        "\n",
        "**The Databricks workspace need to be present in the same subscription as your AML workspace**"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "sample-databrickscompute-attach"
        ]
      },
      "outputs": [],
      "source": [
        "# Replace with your account info before running.\n",
        " \n",
        "db_compute_name=os.getenv(\"DATABRICKS_COMPUTE_NAME\", \"<my-databricks-compute-name>\") # Databricks compute name\n",
        "db_resource_group=os.getenv(\"DATABRICKS_RESOURCE_GROUP\", \"<my-db-resource-group>\") # Databricks resource group\n",
        "db_workspace_name=os.getenv(\"DATABRICKS_WORKSPACE_NAME\", \"<my-db-workspace-name>\") # Databricks workspace name\n",
        "db_access_token=os.getenv(\"DATABRICKS_ACCESS_TOKEN\", \"<my-access-token>\") # Databricks access token\n",
        " \n",
        "try:\n",
        "    databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)\n",
        "    print('Compute target {} already exists'.format(db_compute_name))\n",
        "except ComputeTargetException:\n",
        "    print('Compute not found, will use below parameters to attach new one')\n",
        "    print('db_compute_name {}'.format(db_compute_name))\n",
        "    print('db_resource_group {}'.format(db_resource_group))\n",
        "    print('db_workspace_name {}'.format(db_workspace_name))\n",
        "    print('db_access_token {}'.format(db_access_token))\n",
        " \n",
        "    config = DatabricksCompute.attach_configuration(\n",
        "        resource_group = db_resource_group,\n",
        "        workspace_name = db_workspace_name,\n",
        "        access_token= db_access_token)\n",
        "    databricks_compute=ComputeTarget.attach(ws, db_compute_name, config)\n",
        "    databricks_compute.wait_for_completion(True)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Data Connections with Inputs and Outputs\n",
        "The DatabricksStep supports DBFS, Azure Blob and ADLS for inputs and outputs. You also will need to define a [Secrets](https://docs.azuredatabricks.net/user-guide/secrets/index.html) scope to enable authentication to external data sources such as Blob and ADLS from Databricks.\n",
        "\n",
        "- Databricks documentation on [Azure Blob](https://docs.azuredatabricks.net/spark/latest/data-sources/azure/azure-storage.html)\n",
        "- Databricks documentation on [ADLS](https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake.html)\n",
        "\n",
        "### Type of Data Access\n",
        "Databricks allows to interact with Azure Blob and ADLS in two ways.\n",
        "- **Direct Access**: Databricks allows you to interact with Azure Blob or ADLS URIs directly. The input or output URIs will be mapped to a Databricks widget param in the Databricks notebook.\n",
        "- **Mounting**: You will be supplied with additional parameters and secrets that will enable you to mount your ADLS or Azure Blob input or output location in your Databricks notebook."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Direct Access: Python sample code\n",
        "If you have a data reference named \"input\" it will represent the URI of the input and you can access it directly in the Databricks python notebook like so:"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "```python\n",
        "dbutils.widgets.get(\"input\")\n",
        "y = getArgument(\"input\")\n",
        "df = spark.read.csv(y)\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Mounting: Python sample code for Azure Blob\n",
        "Given an Azure Blob data reference named \"input\" the following widget params will be made available in the Databricks notebook:"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "```python\n",
        "# This contains the input URI\n",
        "dbutils.widgets.get(\"input\")\n",
        "myinput_uri = getArgument(\"input\")\n",
        "\n",
        "# How to get the input datastore name inside ADB notebook\n",
        "# This contains the name of a Databricks secret (in the predefined \"amlscope\" secret scope) \n",
        "# that contians an access key or sas for the Azure Blob input (this name is obtained by appending \n",
        "# the name of the input with \"_blob_secretname\". \n",
        "dbutils.widgets.get(\"input_blob_secretname\") \n",
        "myinput_blob_secretname = getArgument(\"input_blob_secretname\")\n",
        "\n",
        "# This contains the required configuration for mounting\n",
        "dbutils.widgets.get(\"input_blob_config\")\n",
        "myinput_blob_config = getArgument(\"input_blob_config\")\n",
        "\n",
        "# Usage\n",
        "dbutils.fs.mount(\n",
        "  source = myinput_uri,\n",
        "  mount_point = \"/mnt/input\",\n",
        "  extra_configs = {myinput_blob_config:dbutils.secrets.get(scope = \"amlscope\", key = myinput_blob_secretname)})\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Mounting: Python sample code for ADLS\n",
        "Given an ADLS data reference named \"input\" the following widget params will be made available in the Databricks notebook:"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "```python\n",
        "# This contains the input URI\n",
        "dbutils.widgets.get(\"input\") \n",
        "myinput_uri = getArgument(\"input\")\n",
        "\n",
        "# This contains the client id for the service principal \n",
        "# that has access to the adls input\n",
        "dbutils.widgets.get(\"input_adls_clientid\") \n",
        "myinput_adls_clientid = getArgument(\"input_adls_clientid\")\n",
        "\n",
        "# This contains the name of a Databricks secret (in the predefined \"amlscope\" secret scope) \n",
        "# that contains the secret for the above mentioned service principal\n",
        "dbutils.widgets.get(\"input_adls_secretname\") \n",
        "myinput_adls_secretname = getArgument(\"input_adls_secretname\")\n",
        "\n",
        "# This contains the refresh url for the mounting configs\n",
        "dbutils.widgets.get(\"input_adls_refresh_url\") \n",
        "myinput_adls_refresh_url = getArgument(\"input_adls_refresh_url\")\n",
        "\n",
        "# Usage \n",
        "configs = {\"dfs.adls.oauth2.access.token.provider.type\": \"ClientCredential\",\n",
        "           \"dfs.adls.oauth2.client.id\": myinput_adls_clientid,\n",
        "           \"dfs.adls.oauth2.credential\": dbutils.secrets.get(scope = \"amlscope\", key =myinput_adls_secretname),\n",
        "           \"dfs.adls.oauth2.refresh.url\": myinput_adls_refresh_url}\n",
        "\n",
        "dbutils.fs.mount(\n",
        "  source = myinput_uri,\n",
        "  mount_point = \"/mnt/output\",\n",
        "  extra_configs = configs)\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Use Databricks from Azure Machine Learning Pipeline\n",
        "To use Databricks as a compute target from Azure Machine Learning Pipeline, a DatabricksStep is used. Let's define a datasource (via DataReference), intermediate data (via PipelineData) and a pipeline parameter (via PipelineParameter) to be used in DatabricksStep."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.pipeline.core import PipelineParameter\n",
        "\n",
        "# Use the default blob storage\n",
        "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
        "print('Datastore {} will be used'.format(def_blob_store.name))\n",
        "\n",
        "pipeline_param = PipelineParameter(name=\"my_pipeline_param\", default_value=\"pipeline_param1\")\n",
        "\n",
        "# We are uploading a sample file in the local directory to be used as a datasource\n",
        "def_blob_store.upload_files(files=[\"./testdata.txt\"], target_path=\"dbtest\", overwrite=False)\n",
        "\n",
        "step_1_input = DataReference(datastore=def_blob_store, path_on_datastore=\"dbtest\",\n",
        "                                     data_reference_name=\"input\")\n",
        "\n",
        "step_1_output = PipelineData(\"output\", datastore=def_blob_store)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Add a DatabricksStep\n",
        "Adds a Databricks notebook as a step in a Pipeline.\n",
        "- ***name:** Name of the Module\n",
        "- **inputs:** List of input connections for data consumed by this step. Fetch this inside the notebook using dbutils.widgets.get(\"input\")\n",
        "- **outputs:** List of output port definitions for outputs produced by this step. Fetch this inside the notebook using dbutils.widgets.get(\"output\")\n",
        "- **existing_cluster_id:** Cluster ID of an existing Interactive cluster on the Databricks workspace. If you are providing this, do not provide any of the parameters below that are used to create a new cluster such as spark_version, node_type, etc.\n",
        "- **spark_version:** Version of spark for the databricks run cluster. default value: 4.0.x-scala2.11\n",
        "- **node_type:** Azure vm node types for the databricks run cluster. default value: Standard_D3_v2\n",
        "- **num_workers:** Specifies a static number of workers for the databricks run cluster\n",
        "- **min_workers:** Specifies a min number of workers to use for auto-scaling the databricks run cluster\n",
        "- **max_workers:** Specifies a max number of workers to use for auto-scaling the databricks run cluster\n",
        "- **spark_env_variables:** Spark environment variables for the databricks run cluster (dictionary of {str:str}). default value: {'PYSPARK_PYTHON': '/databricks/python3/bin/python3'}\n",
        "- **notebook_path:** Path to the notebook in the databricks instance. If you are providing this, do not provide python script related paramaters or JAR related parameters.\n",
        "- **notebook_params:** Parameters  for the databricks notebook (dictionary of {str:str}). Fetch this inside the notebook using dbutils.widgets.get(\"myparam\")\n",
        "- **python_script_path:** The path to the python script in the DBFS or S3. If you are providing this, do not provide python_script_name which is used for uploading script from local machine.\n",
        "- **python_script_params:** Parameters for the python script (list of str)\n",
        "- **main_class_name:** The name of the entry point in a JAR module. If you are providing this, do not provide any python script or notebook related parameters.\n",
        "- **jar_params:** Parameters for the JAR module (list of str)\n",
        "- **python_script_name:** name of a python script on your local machine (relative to source_directory). If you are providing this do not provide python_script_path which is used to execute a remote python script; or any of the JAR or notebook related parameters.\n",
        "- **source_directory:** folder that contains the script and other files\n",
        "- **hash_paths:** list of paths to hash to detect a change in source_directory (script file is always hashed)\n",
        "- **run_name:** Name in databricks for this run\n",
        "- **timeout_seconds:** Timeout for the databricks run\n",
        "- **runconfig:** Runconfig to use. Either pass runconfig or each library type as a separate parameter but do not mix the two\n",
        "- **maven_libraries:** maven libraries for the databricks run\n",
        "- **pypi_libraries:** pypi libraries for the databricks run\n",
        "- **egg_libraries:** egg libraries for the databricks run\n",
        "- **jar_libraries:** jar libraries for the databricks run\n",
        "- **rcran_libraries:** rcran libraries for the databricks run\n",
        "- **compute_target:** Azure Databricks compute\n",
        "- **allow_reuse:** Whether the step should reuse previous results when run with the same settings/inputs\n",
        "- **version:** Optional version tag to denote a change in functionality for the step\n",
        "\n",
        "\\* *denotes required fields*  \n",
        "*You must provide exactly one of num_workers or min_workers and max_workers paramaters*  \n",
        "*You must provide exactly one of databricks_compute or databricks_compute_name parameters*\n",
        "\n",
        "## Use runconfig to specify library dependencies\n",
        "You can use a runconfig to specify the library dependencies for your cluster in Databricks. The runconfig will contain a databricks section as follows:\n",
        "\n",
        "```yaml\n",
        "environment:\n",
        "# Databricks details\n",
        "  databricks:\n",
        "# List of maven libraries.\n",
        "    mavenLibraries:\n",
        "    - coordinates: org.jsoup:jsoup:1.7.1\n",
        "      repo: ''\n",
        "      exclusions:\n",
        "      - slf4j:slf4j\n",
        "      - '*:hadoop-client'\n",
        "# List of PyPi libraries\n",
        "    pypiLibraries:\n",
        "    - package: beautifulsoup4\n",
        "      repo: ''\n",
        "# List of RCran libraries\n",
        "    rcranLibraries:\n",
        "    -\n",
        "# Coordinates.\n",
        "      package: ada\n",
        "# Repo\n",
        "      repo: http://cran.us.r-project.org\n",
        "# List of JAR libraries\n",
        "    jarLibraries:\n",
        "    -\n",
        "# Coordinates.\n",
        "      library: dbfs:/mnt/libraries/library.jar\n",
        "# List of Egg libraries\n",
        "    eggLibraries:\n",
        "    -\n",
        "# Coordinates.\n",
        "      library: dbfs:/mnt/libraries/library.egg\n",
        "```\n",
        "\n",
        "You can then create a RunConfiguration object using this file and pass it as the runconfig parameter to DatabricksStep.\n",
        "```python\n",
        "from azureml.core.runconfig import RunConfiguration\n",
        "\n",
        "runconfig = RunConfiguration()\n",
        "runconfig.load(path='<directory_where_runconfig_is_stored>', name='<runconfig_file_name>')\n",
        "```"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### 1. Running the demo notebook already added to the Databricks workspace\n",
        "Create a notebook in the Azure Databricks workspace, and provide the path to that notebook as the value associated with the environment variable \"DATABRICKS_NOTEBOOK_PATH\". This will then set the variable\u00c2\u00a0notebook_path\u00c2\u00a0when you run the code cell below:\n",
        "\n",
        "your notebook's path in Azure Databricks UI by hovering over to notebook's title. A typical path of notebook looks like this `/Users/example@databricks.com/example`. See [Databricks Workspace](https://docs.azuredatabricks.net/user-guide/workspace.html) to learn about the folder structure.\n",
        "\n",
        "Note: DataPath `PipelineParameter` should be provided in list of inputs. Such parameters can be accessed by the datapath `name`."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "notebook_path=os.getenv(\"DATABRICKS_NOTEBOOK_PATH\", \"<my-databricks-notebook-path>\") # Databricks notebook path\n",
        "\n",
        "dbNbStep = DatabricksStep(\n",
        "    name=\"DBNotebookInWS\",\n",
        "    inputs=[step_1_input],\n",
        "    outputs=[step_1_output],\n",
        "    num_workers=1,\n",
        "    notebook_path=notebook_path,\n",
        "    notebook_params={'myparam': 'testparam', \n",
        "                     'myparam2': pipeline_param},\n",
        "    run_name='DB_Notebook_demo',\n",
        "    compute_target=databricks_compute,\n",
        "    allow_reuse=True\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build and submit the Experiment\n",
        "\n",
        "Note: Default value of `pipeline_param` will be used if different value is not specified in pipeline parameters during submission"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "steps = [dbNbStep]\n",
        "pipeline = Pipeline(workspace=ws, steps=steps)\n",
        "pipeline_run = Experiment(ws, 'DB_Notebook_demo').submit(pipeline)\n",
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View Run Details"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### 2. Running a Python script from DBFS\n",
        "This shows how to run a Python script in DBFS. \n",
        "\n",
        "To complete this, you will need to first upload the Python script in your local machine to DBFS using the [CLI](https://docs.azuredatabricks.net/user-guide/dbfs-databricks-file-system.html). The CLI command is given below:\n",
        "\n",
        "```\n",
        "dbfs cp ./train-db-dbfs.py dbfs:/train-db-dbfs.py\n",
        "```\n",
        "\n",
        "The code in the below cell assumes that you have completed the previous step of uploading the script `train-db-dbfs.py` to the root folder in DBFS.\n",
        "\n",
        "Note: `pipeline_param` will add two values in the python_script_params, a name followed by value. the name will be in this format `--MY_PIPELINE_PARAM`. For example, in the given case, python_script_params will be `[\"arg1\", \"--MY_PIPELINE_PARAM\", \"pipeline_param1\", \"arg2\"]`"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "python_script_path = os.getenv(\"DATABRICKS_PYTHON_SCRIPT_PATH\", \"<my-databricks-python-script-path>\") # Databricks python script path\n",
        "\n",
        "dbPythonInDbfsStep = DatabricksStep(\n",
        "    name=\"DBPythonInDBFS\",\n",
        "    inputs=[step_1_input],\n",
        "    num_workers=1,\n",
        "    python_script_path=python_script_path,\n",
        "    python_script_params={'arg1', pipeline_param, 'arg2},\n",
        "    run_name='DB_Python_demo',\n",
        "    compute_target=databricks_compute,\n",
        "    allow_reuse=True\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build and submit the Experiment"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "steps = [dbPythonInDbfsStep]\n",
        "pipeline = Pipeline(workspace=ws, steps=steps)\n",
        "pipeline_run = Experiment(ws, 'DB_Python_demo').submit(pipeline)\n",
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View Run Details"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### 3. Running a Python script in Databricks that currenlty is in local computer\n",
        "To run a Python script that is currently in your local computer, follow the instructions below. \n",
        "\n",
        "The commented out code below code assumes that you have `train-db-local.py` in the `source_directory` subdirectory under the current working directory. \n",
        "\n",
        "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step.\n",
        "\n",
        "In this case, the Python script will be uploaded first to DBFS, and then the script will be run in Databricks."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "python_script_name = \"train-db-local.py\"\n",
        "source_directory = \"./databricks_train\"\n",
        "\n",
        "dbPythonInLocalMachineStep = DatabricksStep(\n",
        "    name=\"DBPythonInLocalMachine\",\n",
        "    inputs=[step_1_input],\n",
        "    num_workers=1,\n",
        "    python_script_name=python_script_name,\n",
        "    source_directory=source_directory,\n",
        "    run_name='DB_Python_Local_demo',\n",
        "    compute_target=databricks_compute,\n",
        "    allow_reuse=True\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build and submit the Experiment"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "steps = [dbPythonInLocalMachineStep]\n",
        "pipeline = Pipeline(workspace=ws, steps=steps)\n",
        "pipeline_run = Experiment(ws, 'DB_Python_Local_demo').submit(pipeline)\n",
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View Run Details"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### 4. Running a JAR job that is alreay added in DBFS\n",
        "To run a JAR job that is already uploaded to DBFS, follow the instructions below. You will first upload the JAR file to DBFS using the [CLI](https://docs.azuredatabricks.net/user-guide/dbfs-databricks-file-system.html).\n",
        "\n",
        "The commented out code in the below cell assumes that you have uploaded `train-db-dbfs.jar` to the root folder in DBFS. You can upload `train-db-dbfs.jar` to the root folder in DBFS using this commandline so you can use `jar_library_dbfs_path = \"dbfs:/train-db-dbfs.jar\"`:\n",
        "\n",
        "```\n",
        "dbfs cp ./train-db-dbfs.jar dbfs:/train-db-dbfs.jar\n",
        "```\n",
        "\n",
        "Note: `pipeline_param` will add two values in the python_script_params, a name followed by value. the name will be in this format `--MY_PIPELINE_PARAM`. For example, in the given case, python_script_params will be `[\"arg1\", \"--MY_PIPELINE_PARAM\", \"pipeline_param1\", \"arg2\"]`"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "main_jar_class_name = \"com.microsoft.aeva.Main\"\n",
        "jar_library_dbfs_path = os.getenv(\"DATABRICKS_JAR_LIB_PATH\", \"<my-databricks-jar-lib-path>\") # Databricks jar library path\n",
        "\n",
        "dbJarInDbfsStep = DatabricksStep(\n",
        "    name=\"DBJarInDBFS\",\n",
        "    inputs=[step_1_input],\n",
        "    num_workers=1,\n",
        "    main_class_name=main_jar_class_name,\n",
        "    jar_params={'arg1', pipeline_param, 'arg2'},\n",
        "    run_name='DB_JAR_demo',\n",
        "    jar_libraries=[JarLibrary(jar_library_dbfs_path)],\n",
        "    compute_target=databricks_compute,\n",
        "    allow_reuse=True\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build and submit the Experiment"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "steps = [dbJarInDbfsStep]\n",
        "pipeline = Pipeline(workspace=ws, steps=steps)\n",
        "pipeline_run = Experiment(ws, 'DB_JAR_demo').submit(pipeline)\n",
        "pipeline_run.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### View Run Details"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Next: ADLA as a Compute Target\n",
        "To use ADLA as a compute target from Azure Machine Learning Pipeline, a AdlaStep is used. This [notebook](https://aka.ms/pl-adla) demonstrates the use of AdlaStep in Azure Machine Learning Pipeline."
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "Azure Databricks"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML, Azure Databricks"
    ],
    "friendly_name": "How to use DatabricksStep with AML Pipelines",
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.2"
    },
    "order_index": 5,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of DatabricksStep"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}